package com.wt.demo.threadtest.niosocket;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @description
 * @author: wangtao
 * @date:9:14 2018/6/13
 * @email:tao8.wang@changhong.com
 */
public class NIOSocketTest {

    private Selector selector;
    private ExecutorService exec = Executors.newCachedThreadPool();

    public static Map<Socket, Long> timeStat = new HashMap<>(10240);

    public void startServer() throws IOException {
        selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        System.out.println("begain to open server on port 8000");
        SocketAddress address = new InetSocketAddress("127.0.0.1", 8000);
        ssc.configureBlocking(false);
        ssc.bind(address);

        SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
        for (; ; ) {
            System.out.println("open server on port 8000");
            selector.select();
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            Iterator<SelectionKey> its = readyKeys.iterator();
            long e;
            while (its.hasNext()) {
                SelectionKey readyKey = its.next();
                its.remove();
                if (readyKey.isAcceptable()) {
                    doAccept(readyKey);
                } else if (readyKey.isValid() && readyKey.isReadable()) {
                    if (!timeStat.containsKey(((SocketChannel) readyKey.channel()).socket())) {
                        timeStat.put(((SocketChannel) readyKey.channel()).socket(), System.currentTimeMillis());
                    }
                    doRead(readyKey);
                } else if (readyKey.isValid() && readyKey.isWritable()) {
                    doWrite(readyKey);
                    e = System.currentTimeMillis();
                    long b = timeStat.remove(((SocketChannel) readyKey.channel()).socket());
                    System.out.println("take time <" + (e - b) + "> ms");
                }
            }
        }

    }

    private void doWrite(SelectionKey sk) {
        SocketChannel channel = (SocketChannel) sk.channel();
        EchoClient echoClient = (EchoClient) sk.attachment();
        LinkedList<ByteBuffer> outq = echoClient.getOutq();
        ByteBuffer bb = outq.getLast();
        try {
            int len = channel.write(bb);
            if (len == -1) {
                disconnect(sk);
                return;
            }
            if (bb.remaining() == 0) {
                outq.removeLast();
            }
        } catch (IOException e) {
            System.out.println("failed to write to client.");
            e.printStackTrace();
            disconnect(sk);
        }
        if (outq.size() == 0) {
            sk.interestOps(SelectionKey.OP_READ);
        }
    }

    private void doRead(SelectionKey sk) {
        SocketChannel channel = (SocketChannel) sk.channel();
        ByteBuffer bb = ByteBuffer.allocate(8192);
        int len;
        try {
            len = channel.read(bb);
            if (len < 0) {
                disconnect(sk);
                return;
            }
        } catch (IOException e) {
            System.out.println("failed to read from client.");
            e.printStackTrace();
            disconnect(sk);
            return;
        }
        bb.flip();
        exec.execute(new HandleMsg(sk, bb));
    }

    private void disconnect(SelectionKey sk) {
        System.out.println("sk cancel");
        sk.cancel();
    }

    private void doAccept(SelectionKey sk) {
        ServerSocketChannel channel = (ServerSocketChannel) sk.channel();
        SocketChannel clientChannel;
        try {
            clientChannel = channel.accept();
            clientChannel.configureBlocking(false);

            SelectionKey key = clientChannel.register(selector, SelectionKey.OP_READ);

            EchoClient echoClient = new EchoClient();
            key.attach(echoClient);
            InetAddress clientAddress = clientChannel.socket().getInetAddress();
            System.out.println("Accept connection from " + clientAddress.getHostAddress() + ".");
        } catch (IOException e) {
            System.out.println("Failed to accept new client");
            e.printStackTrace();
        }
    }


    class EchoClient {
        private LinkedList<ByteBuffer> outq;

        public EchoClient() {
            outq = new LinkedList<>();
        }

        public LinkedList<ByteBuffer> getOutq() {
            return outq;
        }

        public void setOutq(LinkedList<ByteBuffer> outq) {
            this.outq = outq;
        }

        public void enqueue(ByteBuffer bb) {
            outq.addFirst(bb);
        }
    }

    class HandleMsg implements Runnable {
        private SelectionKey sk;
        private ByteBuffer bb;

        public HandleMsg(SelectionKey sk, ByteBuffer bb) {
            this.sk = sk;
            this.bb = bb;
        }

        @Override
        public void run() {
            EchoClient echoClient = (EchoClient) sk.attachment();
            echoClient.enqueue(bb);
            sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            selector.wakeup();
        }
    }

    public static void main(String[] args) throws IOException {
        new NIOSocketTest().startServer();
    }
}
